Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds a sharder accessor for native node-group checkpoint timestamp and uses it to: early-discard outdated standby-forward messages, tighten BackFill/cleanup eligibility to Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ObjectCcMap
participant LruEntry
participant TemplateCcMap
participant Sharder
participant CcNode
Client->>ObjectCcMap: send StandbyForward / BackFill (commit_ts, payload)
ObjectCcMap->>LruEntry: locate entry / iterator (GetPage)
LruEntry->>Sharder: request NativeNodeGroupCkptTs()
Sharder->>CcNode: GetNodeGroupCkptTs(native_ng_)
CcNode-->>Sharder: ckpt_ts
Sharder-->>LruEntry: ckpt_ts
alt commit_ts <= ckpt_ts (outdated)
ObjectCcMap->>Client: finish/discard request (early-exit)
else commit_ts > ckpt_ts (accept)
ObjectCcMap->>TemplateCcMap: backfill/merge entry
TemplateCcMap->>LruEntry: update CommitTs / PayloadStatus
TemplateCcMap->>TemplateCcMap: set entry.page.last_dirty_commit_ts_ (if needed)
TemplateCcMap->>TemplateCcMap: propagate to map.last_dirty_commit_ts_ (if any updates)
ObjectCcMap->>Client: respond success
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| commit_ts <= Sharder::Instance().NativeNodeGroupCkptTs()) | ||
| { | ||
| // Discard message since cce has a newer version. | ||
| // Discard message since cce has a newer version or has been |
There was a problem hiding this comment.
you cannot just discard the message if there is an older version in memory. You can only discard the message when you evict this cce right away and force next read to fetch from kv.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@tx_service/include/sharder.h`:
- Around line 533-536: Change the NativeNodeGroupCkptTs accessor to be a const
method and make it null-safe: update its declaration to uint64_t
NativeNodeGroupCkptTs() const and in the implementation check whether
native_cc_node_ is initialized before dereferencing; if native_cc_node_ is null
return a safe fallback (e.g., 0) and optionally emit a diagnostic/log in debug
builds (retain the assert only for debug). Ensure callers like IsPersistent()
continue to work with the new const signature.
- Around line 787-790: NativeNodeGroupCkptTs() currently dereferences
native_cc_node_ without holding cluster_cnf_mux_, risking a dangling pointer if
UpdateInMemoryClusterConfig() removes the entry; acquire a
std::shared_lock<std::shared_mutex> lk(cluster_cnf_mux_) at the start of
NativeNodeGroupCkptTs() (same pattern as GetCckptTs()) before using
native_cc_node_, and either change native_cc_node_ from a raw fault::CcNode* to
std::shared_ptr<fault::CcNode> (updating assignments where native_cc_node_ is
set) or add clear lifetime documentation/comments guaranteeing the node remains
in cluster_config_.cc_nodes_ for the sharder’s lifetime to avoid future
use-after-free.
🧹 Nitpick comments (1)
tx_service/src/sharder.cpp (1)
1013-1017: Add a defensive guard before dereferencing native_cc_node_.
assertis compiled out in release builds; a null pointer here would still crash. A simple readiness/null check avoids that and gives a safe fallback.🛡️ Suggested defensive check
uint64_t Sharder::NativeNodeGroupCkptTs() { - assert(native_cc_node_ != nullptr); - return native_cc_node_->GetCkptTs(); + if (!cc_nodes_init_.load(std::memory_order_acquire) || + native_cc_node_ == nullptr) + { + return 0; + } + return native_cc_node_->GetCkptTs(); }
| uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id); | ||
|
|
||
| uint64_t NativeNodeGroupCkptTs(); | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find where native_cc_node_ is declared, set, and used
rg -n "native_cc_node_" --type cpp --type h -C 2Repository: eloqdata/tx_service
Length of output: 1009
🏁 Script executed:
# Find all call sites of NativeNodeGroupCkptTs()
rg -n "NativeNodeGroupCkptTs\s*\(" --type cpp --type h -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 1394
🏁 Script executed:
# Locate the implementation of NativeNodeGroupCkptTs in the .cpp file
fd -e cpp | xargs grep -l "NativeNodeGroupCkptTs" | head -1 | xargs cat | head -100Repository: eloqdata/tx_service
Length of output: 3332
Make NativeNodeGroupCkptTs() const and guard against uninitialized native_cc_node_.
The assert() at line 1015 won't protect against null deref in release builds. In production, calling NativeNodeGroupCkptTs() before Init() or when the native node group isn't configured will crash. Additionally, this accessor should be const since it has no side effects and is called from const contexts like IsPersistent().
♻️ Suggested header tweak
- uint64_t NativeNodeGroupCkptTs();
+ uint64_t NativeNodeGroupCkptTs() const;Consider either: (1) adding explicit initialization checks with a fallback value, (2) documenting that this method must only be called after Init() completes and verifying that precondition at all call sites, or (3) using a guard pattern to safely access the underlying checkpoint timestamp.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id); | |
| uint64_t NativeNodeGroupCkptTs(); | |
| uint64_t GetNodeGroupCkptTs(uint32_t cc_ng_id); | |
| uint64_t NativeNodeGroupCkptTs() const; |
🤖 Prompt for AI Agents
In `@tx_service/include/sharder.h` around lines 533 - 536, Change the
NativeNodeGroupCkptTs accessor to be a const method and make it null-safe:
update its declaration to uint64_t NativeNodeGroupCkptTs() const and in the
implementation check whether native_cc_node_ is initialized before
dereferencing; if native_cc_node_ is null return a safe fallback (e.g., 0) and
optionally emit a diagnostic/log in debug builds (retain the assert only for
debug). Ensure callers like IsPersistent() continue to work with the new const
signature.
| uint32_t node_id_; | ||
| uint32_t native_ng_; | ||
| fault::CcNode *native_cc_node_{}; | ||
| std::string host_name_; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the full context of the Sharder class and the native_cc_node_ member
cat -n tx_service/include/sharder.h | sed -n '780,800p'Repository: eloqdata/tx_service
Length of output: 863
🏁 Script executed:
# Find where native_cc_node_ is used and how it's assigned
rg -n "native_cc_node_" tx_service/include/sharder.hRepository: eloqdata/tx_service
Length of output: 104
🏁 Script executed:
# Check how cluster_config_.cc_nodes_ is managed and updated
rg -n "cc_nodes_" tx_service/include/sharder.hRepository: eloqdata/tx_service
Length of output: 739
🏁 Script executed:
# Look for cluster config updates in the cpp file
rg -n "UpdateInMemoryClusterConfig|UpdateClusterConfig" tx_service/src/sharder.cpp -A 5 -B 2Repository: eloqdata/tx_service
Length of output: 1646
🏁 Script executed:
# Check how native_cc_node_ is initialized and used
rg -n "native_cc_node_" tx_service/src/sharder.cpp -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 410
🏁 Script executed:
# Check the cluster_config_ type and cc_nodes_ container
rg -n "cluster_config_" tx_service/include/sharder.h | head -20Repository: eloqdata/tx_service
Length of output: 460
🏁 Script executed:
# Check UpdateInMemoryClusterConfig to see how cc_nodes_ is replaced
sed -n '1301,1350p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 1769
🏁 Script executed:
# Look for patterns where native_ng_ entry might be removed from cc_nodes_
rg -n "cc_nodes_\\.erase|cc_nodes_\\.clear" tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if native_cc_node_ is reassigned after config updates
rg -n "native_cc_node_\s*=" tx_service/src/sharder.cpp -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 249
🏁 Script executed:
# Check where NativeNodeGroupCkptTs is called and verify synchronization
rg -n "NativeNodeGroupCkptTs" tx_service/src -A 3 -B 3Repository: eloqdata/tx_service
Length of output: 925
🏁 Script executed:
# Look for the full implementation of UpdateInMemoryClusterConfig
sed -n '1301,1400p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 3610
🏁 Script executed:
# Get more context around the native_cc_node_ assignment
sed -n '260,280p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 840
🏁 Script executed:
# Check what function contains this assignment
sed -n '250,280p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 1268
🏁 Script executed:
# Look for initialization or reassignment of native_cc_node_ in UpdateInMemoryClusterConfig
sed -n '1301,1400p' tx_service/src/sharder.cpp | grep -n "native_cc_node_\|native_ng_"Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if native_cc_node_ is reassigned anywhere else after initial assignment
rg -n "native_cc_node_" tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 217
🏁 Script executed:
# Look at the full context of where native_cc_node_ is assigned in the update flow
sed -n '240,290p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 1983
🏁 Script executed:
# Check if there's any synchronization in NativeNodeGroupCkptTs or callers
sed -n '1010,1025p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 464
🏁 Script executed:
# Check the caller in cc_entry.cpp for synchronization
sed -n '60,75p' tx_service/src/cc/cc_entry.cppRepository: eloqdata/tx_service
Length of output: 563
🏁 Script executed:
# Look for any pattern where cc_nodes_ might be cleared/reset entirely
rg -n "cc_nodes_\s*=" tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check if UpdateInMemoryClusterConfig reassigns native_cc_node_ for changes to native_ng_
sed -n '1301,1400p' tx_service/src/sharder.cpp | tail -60Repository: eloqdata/tx_service
Length of output: 2168
🏁 Script executed:
# Check if there's any code path where cc_nodes_ entries are explicitly removed
rg -n "cc_nodes_\.erase" tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Verify if native_cc_node_ is ever reassigned after initial setup
rg -n "native_cc_node_\s*=" tx_service/src/ -A 2 -B 2Repository: eloqdata/tx_service
Length of output: 384
🏁 Script executed:
# Check if there's any thread synchronization around reads of native_cc_node_
rg -n "GetCkptTs" tx_service/src/ -B 5 -A 2Repository: eloqdata/tx_service
Length of output: 3306
🏁 Script executed:
# Look at the initialization function to understand the full flow
rg -n "void Sharder::Init" tx_service/src/sharder.cpp -A 3Repository: eloqdata/tx_service
Length of output: 45
🏁 Script executed:
# Check line numbers around the function containing native_cc_node_ assignment
sed -n '230,250p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 888
🏁 Script executed:
# Check if removed_ngs includes native_ng_ and if there's any protection
sed -n '1330,1360p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 1093
🏁 Script executed:
# Look for any other reassignments or safety checks for native_cc_node_
rg -n "native_ng_" tx_service/src/sharder.cpp -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 2781
🏁 Script executed:
# Check if there's any guard preventing native_ng_ from being removed
sed -n '1330,1380p' tx_service/src/sharder.cppRepository: eloqdata/tx_service
Length of output: 1902
🏁 Script executed:
# Look for comments or documentation about native_ng_ lifetime guarantees
rg -n "native_ng_\|native_cc_node_" tx_service/include/sharder.h -B 2 -A 2Repository: eloqdata/tx_service
Length of output: 45
Add synchronization to NativeNodeGroupCkptTs() and reconsider raw pointer safety for native_cc_node_.
The raw pointer native_cc_node_ extracts ownership from cluster_config_.cc_nodes_ (a map of shared_ptr<fault::CcNode>), but NativeNodeGroupCkptTs() accesses it without holding cluster_cnf_mux_. This creates a data race: while one thread calls NativeNodeGroupCkptTs(), another thread executing UpdateInMemoryClusterConfig() could remove the native node group entry, causing the backing shared_ptr to be destroyed and leaving a dangling pointer. The TODO comment at line 899 in sharder.cpp explicitly questions whether the native node group is guaranteed to remain in the map after config changes.
Acquire std::shared_lock<std::shared_mutex> lk(cluster_cnf_mux_) in NativeNodeGroupCkptTs() before dereferencing native_cc_node_, consistent with GetCckptTs(). Additionally, consider using std::shared_ptr or documenting lifetime guarantees for native_cc_node_ to prevent future dangling pointer bugs.
🤖 Prompt for AI Agents
In `@tx_service/include/sharder.h` around lines 787 - 790, NativeNodeGroupCkptTs()
currently dereferences native_cc_node_ without holding cluster_cnf_mux_, risking
a dangling pointer if UpdateInMemoryClusterConfig() removes the entry; acquire a
std::shared_lock<std::shared_mutex> lk(cluster_cnf_mux_) at the start of
NativeNodeGroupCkptTs() (same pattern as GetCckptTs()) before using
native_cc_node_, and either change native_cc_node_ from a raw fault::CcNode* to
std::shared_ptr<fault::CcNode> (updating assignments where native_cc_node_ is
set) or add clear lifetime documentation/comments guaranteeing the node remains
in cluster_config_.cc_nodes_ for the sharder’s lifetime to avoid future
use-after-free.
* fix standby node ccentry eviction; * discard forward msg with commit_ts smaller than leader's ckpt_ts
…n checkpointed; override the cce if a newer version is Backfilled
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tx_service/include/cc/template_cc_map.h (1)
7966-8009:⚠️ Potential issue | 🟠 MajorAvoid advancing map dirty timestamp when no entries were updated.
last_dirty_commit_ts_is advanced unconditionally after the loop; when WAL is enabled and no commit_ts changes occur, this can mark the map dirty and disable page-skip optimizations in later data-sync scans. Consider gating the update on actual updates (ortxservice_skip_wal).Proposed fix
- uint64_t now_ts = shard_->Now(); + uint64_t now_ts = shard_->Now(); + bool updated_any = false; while (it != end_it) { CcEntry<KeyT, ValueT, VersionedRecord, RangePartitioned> *cce = it->second; auto *ccp = it.GetPage(); if (txservice_skip_wal) { // If wal log is disabled, we need to flush all in memory cache // to overwrite potential newer version in kv. RecordStatus status = cce->PayloadStatus(); // Invariant: // 1) Normal / Deleted => no buffered commands // 2) HasBufferedCommandList() => status == Unknown if (status == RecordStatus::Normal || status == RecordStatus::Deleted) { bool was_dirty = cce->IsDirty(); cce->SetCommitTsPayloadStatus(now_ts, status); OnCommittedUpdate(cce, was_dirty); assert(!cce->HasBufferedCommandList()); + updated_any = true; if (now_ts > ccp->last_dirty_commit_ts_) { ccp->last_dirty_commit_ts_ = now_ts; } } assert(!cce->HasBufferedCommandList() || status == RecordStatus::Unknown); } else { // if log is enabled, we only need to flush data after primary // last ckpt ts. if (cce->CommitTs() <= ckpt_ts) { bool was_dirty = cce->IsDirty(); cce->SetCkptTs(cce->CommitTs()); OnFlushed(cce, was_dirty); } } it++; } - if (now_ts > last_dirty_commit_ts_) + if (updated_any && now_ts > last_dirty_commit_ts_) { last_dirty_commit_ts_ = now_ts; }
Here are some reminders before you submit the pull request
fixes eloqdb/tx_service#issue_id./mtr --suite=mono_main,mono_multi,mono_basicSummary by CodeRabbit
Bug Fixes
Refactor